In [6]:
import os
import pandas as pd

# Specify the folder containing your subset of images and the CSV file path
images_folder = 'subset_images'
csv_path = 'list_attr_celeba.csv'

# Read the CSV file into a DataFrame
df = pd.read_csv(csv_path)

# Get a set of image filenames present in the subset_images folder
existing_images = set(os.listdir(images_folder))

# Filter the DataFrame to only include rows where the image_id exists in the folder
df_subset = df[df['image_id'].isin(existing_images)]
df_subset = df_subset.reset_index(drop=True)

print("Total rows in original CSV:", len(df))
print("Total rows in subset CSV:", len(df_subset))

# Optionally, save the subset to a new CSV file for future use:
df_subset.to_csv("subset_list_attr_celeba.csv", index=False)

# Get attribute names from the CSV (all columns except the first "image_id")
attribute_names = list(df_subset.columns[1:])
print("Attribute Names:")
print(attribute_names)
Total rows in original CSV: 202599
Total rows in subset CSV: 15000
Attribute Names:
['5_o_Clock_Shadow', 'Arched_Eyebrows', 'Attractive', 'Bags_Under_Eyes', 'Bald', 'Bangs', 'Big_Lips', 'Big_Nose', 'Black_Hair', 'Blond_Hair', 'Blurry', 'Brown_Hair', 'Bushy_Eyebrows', 'Chubby', 'Double_Chin', 'Eyeglasses', 'Goatee', 'Gray_Hair', 'Heavy_Makeup', 'High_Cheekbones', 'Male', 'Mouth_Slightly_Open', 'Mustache', 'Narrow_Eyes', 'No_Beard', 'Oval_Face', 'Pale_Skin', 'Pointy_Nose', 'Receding_Hairline', 'Rosy_Cheeks', 'Sideburns', 'Smiling', 'Straight_Hair', 'Wavy_Hair', 'Wearing_Earrings', 'Wearing_Hat', 'Wearing_Lipstick', 'Wearing_Necklace', 'Wearing_Necktie', 'Young']
In [7]:
import os
import pandas as pd
from PIL import Image
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, random_split
import torch

class CelebASubsetDataset(Dataset):
    def __init__(self, images_dir, csv_file, transform=None):
        """
        Args:
            images_dir (str): Path to the directory containing the images.
            csv_file (str): Path to the CSV file with image attributes.
            transform (callable, optional): Optional transform to be applied on an image.
        """
        self.images_dir = images_dir
        self.transform = transform
        
        # Read the CSV file into a DataFrame
        self.attr_df = pd.read_csv(csv_file)
        
        # Assuming the first column is 'image_id' and the rest are attributes, we store the attribute names.
        self.image_ids = self.attr_df['image_id'].values
        # Get attribute columns (all columns besides 'image_id')
        self.attributes = self.attr_df.drop(columns=['image_id']).values.astype('float32')
        
    def __len__(self):
        return len(self.image_ids)
    
    def __getitem__(self, idx):
        # Get the image file name and its corresponding attributes
        img_id = self.image_ids[idx]
        attr = self.attributes[idx]
        
        # Construct the full image path
        img_path = os.path.join(self.images_dir, img_id)
        
        # Open the image file and ensure it is in RGB mode.
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
        
        # Convert the attributes to a tensor
        attr_tensor = torch.tensor(attr)
        
        return image, attr_tensor

# Define a transform pipeline.
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

# Create the full dataset instance.
dataset = CelebASubsetDataset(
    images_dir='subset_images',
    csv_file='subset_list_attr_celeba.csv',
    transform=transform
)

# Define a split proportion for training and validation.
# For example, an 80/20 split:
train_size = int(0.999 * len(dataset))
valid_size = len(dataset) - train_size

# Split the dataset using random_split.
train_dataset, valid_dataset = random_split(dataset, [train_size, valid_size])

# Create DataLoaders for both splits.
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
valid_dataloader = DataLoader(valid_dataset, batch_size=32, shuffle=False, num_workers=4)

# Optional: Print out the sizes for confirmation.
print(f"Total dataset size: {len(dataset)}")
print(f"Training set size: {len(train_dataset)}")
print(f"Validation set size: {len(valid_dataset)}")
Total dataset size: 15000
Training set size: 14985
Validation set size: 15
In [8]:
import matplotlib.pyplot as plt
import numpy as np

# Retrieve a sample from the dataset. For example, the first sample
sample_img, sample_attr = dataset[0]

# Print the attribute names with their corresponding attribute values for the sample
print("\nSample Image Attributes:")
for name, value in zip(attribute_names, sample_attr.tolist()):
    print(f"{name}: {value}")

# Convert the sample image tensor from (C, H, W) to (H, W, C) for visualization
img_np = sample_img.permute(1, 2, 0).numpy()

# Display the sample image
plt.figure(figsize=(6, 6))
plt.imshow(img_np)
plt.title("Sample Image")
plt.axis('off')
plt.show()
Sample Image Attributes:
5_o_Clock_Shadow: 1.0
Arched_Eyebrows: -1.0
Attractive: 1.0
Bags_Under_Eyes: 1.0
Bald: -1.0
Bangs: -1.0
Big_Lips: 1.0
Big_Nose: 1.0
Black_Hair: 1.0
Blond_Hair: -1.0
Blurry: -1.0
Brown_Hair: -1.0
Bushy_Eyebrows: 1.0
Chubby: -1.0
Double_Chin: -1.0
Eyeglasses: -1.0
Goatee: -1.0
Gray_Hair: -1.0
Heavy_Makeup: -1.0
High_Cheekbones: -1.0
Male: 1.0
Mouth_Slightly_Open: -1.0
Mustache: -1.0
Narrow_Eyes: -1.0
No_Beard: 1.0
Oval_Face: -1.0
Pale_Skin: -1.0
Pointy_Nose: 1.0
Receding_Hairline: -1.0
Rosy_Cheeks: -1.0
Sideburns: -1.0
Smiling: -1.0
Straight_Hair: 1.0
Wavy_Hair: -1.0
Wearing_Earrings: -1.0
Wearing_Hat: -1.0
Wearing_Lipstick: -1.0
Wearing_Necklace: -1.0
Wearing_Necktie: -1.0
Young: 1.0
No description has been provided for this image
In [9]:
hat_df = df_subset[df_subset['Wearing_Hat'] == 1]

if hat_df.empty:
    print("No images with Wearing_Hat attribute equal to 1 found.")
else:
    # Get the first matching row's index within the subset DataFrame
    sample_index = hat_df.index[1]
    
    # Retrieve the sample from the dataset
    sample_img, sample_attr = dataset[sample_index]
    
    # Print attribute names and values for this sample
    print("\nAttributes for the image with Wearing_Hat = 1:")
    for name, value in zip(attribute_names, sample_attr.tolist()):
        print(f"{name}: {value}")

    # Prepare the image for display (tensor from (C, H, W) to (H, W, C))
    img_np = sample_img.permute(1, 2, 0).numpy()
    
    # Display the image using matplotlib
    plt.figure(figsize=(6, 6))
    plt.imshow(img_np)
    plt.title("Image with Wearing_Hat = 1")
    plt.axis('off')
    plt.show()
Attributes for the image with Wearing_Hat = 1:
5_o_Clock_Shadow: -1.0
Arched_Eyebrows: -1.0
Attractive: -1.0
Bags_Under_Eyes: 1.0
Bald: -1.0
Bangs: -1.0
Big_Lips: -1.0
Big_Nose: 1.0
Black_Hair: -1.0
Blond_Hair: -1.0
Blurry: -1.0
Brown_Hair: -1.0
Bushy_Eyebrows: -1.0
Chubby: -1.0
Double_Chin: -1.0
Eyeglasses: -1.0
Goatee: -1.0
Gray_Hair: -1.0
Heavy_Makeup: -1.0
High_Cheekbones: 1.0
Male: 1.0
Mouth_Slightly_Open: 1.0
Mustache: -1.0
Narrow_Eyes: -1.0
No_Beard: 1.0
Oval_Face: -1.0
Pale_Skin: -1.0
Pointy_Nose: -1.0
Receding_Hairline: -1.0
Rosy_Cheeks: -1.0
Sideburns: -1.0
Smiling: 1.0
Straight_Hair: -1.0
Wavy_Hair: -1.0
Wearing_Earrings: -1.0
Wearing_Hat: 1.0
Wearing_Lipstick: -1.0
Wearing_Necklace: -1.0
Wearing_Necktie: -1.0
Young: -1.0
No description has been provided for this image
In [54]:
hat_df.index[1]
Out[54]:
4
In [10]:
import math
import torch
import torch.nn as nn
import torch.optim as optim
import pytorch_lightning as pl
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import torch.nn.functional as F

class BetaVAE(pl.LightningModule):
    def __init__(self, latent_dim=64, beta=4.0, learning_rate=1e-3, image_channels=3, sample_interval=5):
        """
        Args:
            latent_dim (int): Dimensionality of the latent space.
            beta (float): Scaling factor for the KL divergence.
            learning_rate (float): Optimizer learning rate.
            image_channels (int): Number of input image channels.
            sample_interval (int): Run reconstruction visualization every x epochs.
        """
        super(BetaVAE, self).__init__()
        self.latent_dim = latent_dim
        self.beta = beta
        self.learning_rate = learning_rate
        self.image_channels = image_channels
        self.sample_interval = sample_interval

        # --- Encoder ---
        self.encoder = nn.Sequential(
            nn.Conv2d(image_channels, 32, kernel_size=4, stride=2, padding=1),  # 224 -> 112
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1),              # 112 -> 56
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),             # 56 -> 28
            nn.ReLU(),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),            # 28 -> 14
            nn.ReLU()
        )
        self.flatten_dim = 256 * 14 * 14
        self.fc_mu = nn.Linear(self.flatten_dim, latent_dim)
        self.fc_logvar = nn.Linear(self.flatten_dim, latent_dim)
        
        # --- Decoder ---
        self.decoder_input = nn.Linear(latent_dim, self.flatten_dim)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),   # 14 -> 28
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),    # 28 -> 56
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),     # 56 -> 112
            nn.ReLU(),
            nn.ConvTranspose2d(32, image_channels, kernel_size=4, stride=2, padding=1),  # 112 -> 224
            nn.Sigmoid()
        )

    def encode(self, x):
        x = self.encoder(x)
        x = x.view(x.size(0), -1)
        mu = self.fc_mu(x)
        logvar = self.fc_logvar(x)
        return mu, logvar

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z):
        x = self.decoder_input(z)
        x = x.view(-1, 256, 14, 14)
        x = self.decoder(x)
        return x

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        x_recon = self.decode(z)
        return x_recon, mu, logvar

    def loss_function(self, x, x_recon, mu, logvar):
        recon_loss = F.binary_cross_entropy(x_recon, x, reduction='sum')
        kl_div = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
        loss = recon_loss + self.beta * kl_div
        return loss, recon_loss, kl_div

    def training_step(self, batch, batch_idx):
        x, _ = batch  # Use only images.
        x_recon, mu, logvar = self.forward(x)
        loss, recon_loss, kl_div = self.loss_function(x, x_recon, mu, logvar)
        self.log("train_loss", loss, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, _ = batch
        x_recon, mu, logvar = self.forward(x)
        loss, recon_loss, kl_div = self.loss_function(x, x_recon, mu, logvar)
        self.log("val_loss", loss, prog_bar=True)
        return loss

    def log_validation_reconstructions(self):
        """
        Access the first batch from the validation dataloader,
        use its first 15 images for visualization, and display a figure with
        3 rows of 5 originals alongside 3 rows of 5 reconstructions.
        """
        # Retrieve the validation dataloader.
        valid_loader = self.trainer.val_dataloaders
        if not isinstance(valid_loader, (list, tuple)):
            valid_loader = [valid_loader]
        valid_loader = valid_loader[0]
        
        # Grab the first batch.
        batch = next(iter(valid_loader))
        x, _ = batch
        x = x[:15].to(self.device)  # Get the first 15 images.

        self.eval()
        with torch.no_grad():
            x_recon, _, _ = self.forward(x)
            
        originals = x.cpu()
        recons = x_recon.cpu()

        # Create a figure using GridSpec: 3 rows x 10 columns.
        fig = plt.figure(figsize=(15, 9))
        gs = gridspec.GridSpec(3, 10, wspace=0.1, hspace=0.2)

        for i in range(15):
            row = i // 5
            col = i % 5
            # Left block for originals.
            ax_orig = fig.add_subplot(gs[row, col])
            ax_orig.imshow(originals[i].permute(1, 2, 0))
            ax_orig.axis('off')
            if row == 0:
                ax_orig.set_title("Original")
            
            # Right block for reconstructions.
            ax_recon = fig.add_subplot(gs[row, col + 5])
            ax_recon.imshow(recons[i].permute(1, 2, 0))
            ax_recon.axis('off')
            if row == 0:
                ax_recon.set_title("Reconstruction")
                
        plt.suptitle(f"Validation Reconstructions, Epoch {self.current_epoch + 1}", fontsize=16)
        plt.tight_layout()
        plt.show()
        self.train()

    def on_validation_epoch_end(self):
        """
        Visualize reconstruction every sample_interval epochs.
        """
        if (self.current_epoch + 1) % self.sample_interval == 0:
            self.log_validation_reconstructions()
            
    def sample_new_images(self, n=25, plot=True):
        """
        After training the VAE, sample n latent vectors from a standard normal distribution,
        decode them into images, and optionally plot them in a grid.
        
        Args:
            n (int): Number of new images to generate.
            plot (bool): Whether to show the generated images.
        
        Returns:
            Tensor: Generated images.
        """
        self.eval()
        with torch.no_grad():
            # Sample latent vectors from a standard normal distribution.
            z = torch.randn(n, self.latent_dim).to(self.device)
            generated_images = self.decode(z)
        self.train()  # Optionally, return back to training mode.
        
        if plot:
            # Determine grid size.
            grid_rows = math.ceil(math.sqrt(n))
            grid_cols = math.ceil(n / grid_rows)
            fig, axes = plt.subplots(grid_rows, grid_cols, figsize=(grid_cols * 3, grid_rows * 3))
            
            # Flatten axes for easy iteration.
            if grid_rows * grid_cols > 1:
                axes = axes.flatten()
            else:
                axes = [axes]
            
            for i in range(n):
                img = generated_images[i].cpu().permute(1, 2, 0)
                axes[i].imshow(img)
                axes[i].axis('off')
            # Turn off any remaining axes.
            for j in range(n, len(axes)):
                axes[j].axis('off')
            plt.suptitle("Sampled New Images from VAE")
            plt.tight_layout()
            plt.show()
        
        return generated_images

    def configure_optimizers(self):
        return optim.Adam(self.parameters(), lr=self.learning_rate)
In [12]:
from pytorch_lightning import Trainer

# Initialize your model.
model = BetaVAE(latent_dim=128, beta=1.0, learning_rate=1e-3, image_channels=3, sample_interval=5)

# Create a trainer instance.
trainer = Trainer(max_epochs=20)  # Adjust epochs or add other trainer parameters as needed

# Train the model. The validation reconstructions will be shown after every validation epoch.
trainer.fit(model, train_dataloader, valid_dataloader)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name          | Type       | Params | Mode 
-----------------------------------------------------
0 | encoder       | Sequential | 690 K  | train
1 | fc_mu         | Linear     | 6.4 M  | train
2 | fc_logvar     | Linear     | 6.4 M  | train
3 | decoder_input | Linear     | 6.5 M  | train
4 | decoder       | Sequential | 689 K  | train
-----------------------------------------------------
20.7 M    Trainable params
0         Non-trainable params
20.7 M    Total params
82.792    Total estimated model params size (MB)
Sanity Checking: |          | 0/? [00:00<?, ?it/s]
Training: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
/tmp/ipykernel_23797/471282635.py:145: UserWarning: This figure includes Axes that are not compatible with tight_layout, so results might be incorrect.
  plt.tight_layout()
No description has been provided for this image
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
No description has been provided for this image
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
No description has been provided for this image
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
No description has been provided for this image
`Trainer.fit` stopped: `max_epochs=20` reached.
In [13]:
# After training:
generated_imgs = model.sample_new_images(n=25, plot=True)
No description has been provided for this image

Beta VAEs¶

In [14]:
from pytorch_lightning import Trainer

# Initialize your model.
model = BetaVAE(latent_dim=128, beta=10.0, learning_rate=1e-3, image_channels=3, sample_interval=5)

# Create a trainer instance.
trainer = Trainer(max_epochs=10)  # Adjust epochs or add other trainer parameters as needed

# Train the model. The validation reconstructions will be shown after every validation epoch.
trainer.fit(model, train_dataloader, valid_dataloader)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name          | Type       | Params | Mode 
-----------------------------------------------------
0 | encoder       | Sequential | 690 K  | train
1 | fc_mu         | Linear     | 6.4 M  | train
2 | fc_logvar     | Linear     | 6.4 M  | train
3 | decoder_input | Linear     | 6.5 M  | train
4 | decoder       | Sequential | 689 K  | train
-----------------------------------------------------
20.7 M    Trainable params
0         Non-trainable params
20.7 M    Total params
82.792    Total estimated model params size (MB)
Sanity Checking: |          | 0/? [00:00<?, ?it/s]
Training: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
/tmp/ipykernel_23797/471282635.py:145: UserWarning: This figure includes Axes that are not compatible with tight_layout, so results might be incorrect.
  plt.tight_layout()
No description has been provided for this image
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
No description has been provided for this image
`Trainer.fit` stopped: `max_epochs=10` reached.
In [15]:
generated_imgs = model.sample_new_images(n=25, plot=True)
No description has been provided for this image
In [16]:
from pytorch_lightning import Trainer

# Initialize your model.
model = BetaVAE(latent_dim=128, beta=50.0, learning_rate=1e-3, image_channels=3, sample_interval=5)

# Create a trainer instance.
trainer = Trainer(max_epochs=10)  # Adjust epochs or add other trainer parameters as needed

# Train the model. The validation reconstructions will be shown after every validation epoch.
trainer.fit(model, train_dataloader, valid_dataloader)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name          | Type       | Params | Mode 
-----------------------------------------------------
0 | encoder       | Sequential | 690 K  | train
1 | fc_mu         | Linear     | 6.4 M  | train
2 | fc_logvar     | Linear     | 6.4 M  | train
3 | decoder_input | Linear     | 6.5 M  | train
4 | decoder       | Sequential | 689 K  | train
-----------------------------------------------------
20.7 M    Trainable params
0         Non-trainable params
20.7 M    Total params
82.792    Total estimated model params size (MB)
Sanity Checking: |          | 0/? [00:00<?, ?it/s]
Training: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
/tmp/ipykernel_23797/471282635.py:145: UserWarning: This figure includes Axes that are not compatible with tight_layout, so results might be incorrect.
  plt.tight_layout()
No description has been provided for this image
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
No description has been provided for this image
`Trainer.fit` stopped: `max_epochs=10` reached.
In [17]:
generated_imgs = model.sample_new_images(n=25, plot=True)
No description has been provided for this image

Editing Images¶

In [33]:
from pytorch_lightning import Trainer

# Initialize your model.
model = BetaVAE(latent_dim=1024, beta=1.0, learning_rate=1e-3, image_channels=3, sample_interval=2)

# Create a trainer instance.
trainer = Trainer(max_epochs=10)  # Adjust epochs or add other trainer parameters as needed

# Train the model. The validation reconstructions will be shown after every validation epoch.
trainer.fit(model, train_dataloader, valid_dataloader)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name          | Type       | Params | Mode 
-----------------------------------------------------
0 | encoder       | Sequential | 690 K  | train
1 | fc_mu         | Linear     | 51.4 M | train
2 | fc_logvar     | Linear     | 51.4 M | train
3 | decoder_input | Linear     | 51.4 M | train
4 | decoder       | Sequential | 689 K  | train
-----------------------------------------------------
155 M     Trainable params
0         Non-trainable params
155 M     Total params
622.292   Total estimated model params size (MB)
Sanity Checking: |          | 0/? [00:00<?, ?it/s]
Training: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
/tmp/ipykernel_67770/471282635.py:145: UserWarning: This figure includes Axes that are not compatible with tight_layout, so results might be incorrect.
  plt.tight_layout()
No description has been provided for this image
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
No description has been provided for this image
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
No description has been provided for this image
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
No description has been provided for this image
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
No description has been provided for this image
`Trainer.fit` stopped: `max_epochs=10` reached.
In [34]:
import torch

# Ensure the model is set to evaluation mode.
model.eval()

hat_latents = []
no_hat_latents = []

# Determine the index for "Wearing_Hat" in your attribute vectors
import pandas as pd
attr_csv = 'subset_list_attr_celeba.csv'
df = pd.read_csv(attr_csv)
attribute_columns = list(df.columns)
attribute_columns.remove("image_id")
wearing_hat_index = attribute_columns.index("Wearing_Hat")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

with torch.no_grad():
    for imgs, attrs in train_dataloader:
        imgs = imgs.to(device)
        # Get the latent mu vector (using model.forward returns (x_recon, mu, logvar))
        _, mu, _ = model.forward(imgs)
        mu = mu.cpu()
        for i in range(mu.size(0)):
            # Assume attribute values are 1 for wearing a hat and -1 for not wearing.
            if attrs[i][wearing_hat_index].item() == 1:
                hat_latents.append(mu[i])
            elif attrs[i][wearing_hat_index].item() == -1:
                no_hat_latents.append(mu[i])

hat_latents = torch.stack(hat_latents)      # shape: (num_hat, latent_dim)
no_hat_latents = torch.stack(no_hat_latents)  # shape: (num_no_hat, latent_dim)

mean_hat = torch.mean(hat_latents, dim=0)
mean_no_hat = torch.mean(no_hat_latents, dim=0)

print("Mean latent vector for images with hat:", mean_hat)
print("Mean latent vector for images without hat:", mean_no_hat)
Mean latent vector for images with hat: tensor([ 0.0091,  0.2144,  0.0906,  ...,  0.1839, -0.0138, -0.1630])
Mean latent vector for images without hat: tensor([-0.0244,  0.1517,  0.1202,  ...,  0.1877, -0.0463, -0.1592])
In [35]:
import matplotlib.pyplot as plt

# Ensure the model is in evaluation mode.
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

with torch.no_grad():
    # Unsqueeze to add a batch dimension (from shape [latent_dim] to [1, latent_dim])
    latent_hat = mean_hat.unsqueeze(0).to(device)
    latent_no_hat = mean_no_hat.unsqueeze(0).to(device)
    
    # Decode the latent vectors to generate images.
    img_hat = model.decode(latent_hat)         # shape: [1, C, H, W]
    img_no_hat = model.decode(latent_no_hat)     # shape: [1, C, H, W]

# Move the images to CPU and convert from tensor to numpy array.
img_hat_np = img_hat[0].cpu().permute(1, 2, 0).numpy()       # shape: [H, W, C]
img_no_hat_np = img_no_hat[0].cpu().permute(1, 2, 0).numpy()   # shape: [H, W, C]

# Plot the images side by side.
fig, axes = plt.subplots(1, 2, figsize=(10, 5))

axes[0].imshow(img_hat_np)
axes[0].set_title("Decoded: Wearing Hat")
axes[0].axis("off")

axes[1].imshow(img_no_hat_np)
axes[1].set_title("Decoded: Not Wearing Hat")
axes[1].axis("off")

plt.show()
No description has been provided for this image
In [36]:
hat_diff = mean_hat - mean_no_hat

print(hat_diff)
tensor([ 0.0334,  0.0627, -0.0296,  ..., -0.0038,  0.0325, -0.0038])
In [37]:
import matplotlib.pyplot as plt
import torch

# Set the model to evaluation mode.
model.eval()

# Ensure hat_diff has shape [1, latent_dim] by unsqueezing a batch dimension.
device = next(model.parameters()).device
hat_diff_batch = hat_diff.unsqueeze(0).to(device)

with torch.no_grad():
    # Decode the latent vector.
    decoded_hat_diff = model.decode(hat_diff_batch)

# The output has shape [1, C, H, W]. Remove the batch dimension and permute axes to H x W x C.
decoded_img = decoded_hat_diff[0].cpu().permute(1, 2, 0).numpy()

# Plot the decoded image.
plt.figure(figsize=(5, 5))
plt.imshow(decoded_img)
plt.title("Decoded Hat Difference")
plt.axis("off")
plt.show()
No description has been provided for this image
In [38]:
import matplotlib.pyplot as plt
import numpy as np

# Retrieve a sample from the dataset. For example, the first sample
sample_img, sample_attr = dataset[0]

# Print the attribute names with their corresponding attribute values for the sample
print("\nSample Image Attributes:")
for name, value in zip(attribute_names, sample_attr.tolist()):
    print(f"{name}: {value}")

# Convert the sample image tensor from (C, H, W) to (H, W, C) for visualization
img_np = sample_img.permute(1, 2, 0).numpy()

# Display the sample image
plt.figure(figsize=(6, 6))
plt.imshow(img_np)
plt.title("Sample Image")
plt.axis('off')
plt.show()
Sample Image Attributes:
5_o_Clock_Shadow: 1.0
Arched_Eyebrows: -1.0
Attractive: 1.0
Bags_Under_Eyes: 1.0
Bald: -1.0
Bangs: -1.0
Big_Lips: 1.0
Big_Nose: 1.0
Black_Hair: 1.0
Blond_Hair: -1.0
Blurry: -1.0
Brown_Hair: -1.0
Bushy_Eyebrows: 1.0
Chubby: -1.0
Double_Chin: -1.0
Eyeglasses: -1.0
Goatee: -1.0
Gray_Hair: -1.0
Heavy_Makeup: -1.0
High_Cheekbones: -1.0
Male: 1.0
Mouth_Slightly_Open: -1.0
Mustache: -1.0
Narrow_Eyes: -1.0
No_Beard: 1.0
Oval_Face: -1.0
Pale_Skin: -1.0
Pointy_Nose: 1.0
Receding_Hairline: -1.0
Rosy_Cheeks: -1.0
Sideburns: -1.0
Smiling: -1.0
Straight_Hair: 1.0
Wavy_Hair: -1.0
Wearing_Earrings: -1.0
Wearing_Hat: -1.0
Wearing_Lipstick: -1.0
Wearing_Necklace: -1.0
Wearing_Necktie: -1.0
Young: 1.0
No description has been provided for this image
In [39]:
import matplotlib.pyplot as plt
import torch

# Assume 'model' is your trained VAE and it's already on the proper device.
# Also, hat_diff is computed from your training data (hat_diff = mean_hat - mean_no_hat).

model.eval()
device = next(model.parameters()).device

# Retrieve a sample from the dataset (an image with no hat).
sample_img, sample_attr = dataset[0]  # sample_img shape: (C, H, W)

# Prepare the sample image: add a batch dimension and move to device.
sample_img_batch = sample_img.unsqueeze(0).to(device)

# Encode the image to obtain its latent representation.
with torch.no_grad():
    # The forward pass returns (reconstructed, mu, logvar).
    _, mu, _ = model(sample_img_batch)

# Choose a scaling factor for the hat editing. Adjust as needed.
alpha = 2.0

# Make sure hat_diff is moved to the same device and add a batch dimension.
hat_diff_batch = hat_diff.to(device).unsqueeze(0)

# Edit the latent code by adding the scaled hat difference.
edited_latent = mu + alpha * hat_diff_batch

# Decode the modified latent vector to get the edited image.
with torch.no_grad():
    # Use the decoder directly. Some implementations may require you to call model.forward on the latent.
    edited_img = model.decode(edited_latent)

# Remove the batch dimension and convert to a NumPy array for plotting.
edited_img_np = edited_img[0].cpu().permute(1, 2, 0).numpy()
original_img_np = sample_img.cpu().permute(1, 2, 0).numpy()

# Plot the original and edited images side by side.
plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
plt.imshow(original_img_np)
plt.title("Original Image (No Hat)")
plt.axis("off")

plt.subplot(1, 2, 2)
plt.imshow(edited_img_np)
plt.title("Edited Image with Hat")
plt.axis("off")

plt.show()
No description has been provided for this image

Conditional VAEs¶

In [72]:
import math
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import pytorch_lightning as pl
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec

class ConditionalVAE(pl.LightningModule):
    def __init__(
        self, 
        latent_dim=64, 
        attribute_dim=40, 
        beta=4.0, 
        learning_rate=1e-3, 
        image_channels=3, 
        sample_interval=5
    ):
        """
        Args:
            latent_dim (int): Dimensionality of the latent space.
            attribute_dim (int): Number of attributes.
            beta (float): Scaling factor for the KL divergence.
            learning_rate (float): Optimizer learning rate.
            image_channels (int): Number of channels in the input images.
            sample_interval (int): Interval (in epochs) to visualize reconstructions.
        """
        super(ConditionalVAE, self).__init__()
        self.latent_dim = latent_dim
        self.attribute_dim = attribute_dim
        self.beta = beta
        self.learning_rate = learning_rate
        self.image_channels = image_channels
        self.sample_interval = sample_interval

        # --- Encoder ---
        self.encoder = nn.Sequential(
            nn.Conv2d(image_channels, 32, kernel_size=4, stride=2, padding=1),  # 224 -> 112
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1),              # 112 -> 56
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),             # 56 -> 28
            nn.ReLU(),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),            # 28 -> 14
            nn.ReLU()
        )
        self.flatten_dim = 256 * 14 * 14
        self.fc_mu = nn.Linear(self.flatten_dim, latent_dim)
        self.fc_logvar = nn.Linear(self.flatten_dim, latent_dim)
        
        # --- Conditional Decoder ---
        # Now the input is [latent code, attribute vector] with total dimension = latent_dim + attribute_dim.
        self.fc_dec = nn.Linear(latent_dim + attribute_dim, self.flatten_dim)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),   # 14 -> 28
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),    # 28 -> 56
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),     # 56 -> 112
            nn.ReLU(),
            nn.ConvTranspose2d(32, image_channels, kernel_size=4, stride=2, padding=1),  # 112 -> 224
            nn.Sigmoid()  # Images are assumed normalized to [0, 1].
        )
    
    def encode(self, x):
        h = self.encoder(x)
        h = h.view(h.size(0), -1)
        mu = self.fc_mu(h)
        logvar = self.fc_logvar(h)
        return mu, logvar
    
    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std
    
    def decode(self, z_cond):
        h = self.fc_dec(z_cond)
        h = h.view(-1, 256, 14, 14)
        x_recon = self.decoder(h)
        return x_recon
    
    def forward(self, x, attrs):
        """
        Args:
            x (Tensor): Images of shape (B, C, H, W).
            attrs (Tensor): Attribute vectors of shape (B, attribute_dim).
        Returns:
            x_recon, mu, logvar
        """
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        # Concatenate the latent code and attribute vector.
        z_cond = torch.cat([z, attrs], dim=1)
        x_recon = self.decode(z_cond)
        return x_recon, mu, logvar
    
    def training_step(self, batch, batch_idx):
        x, attrs = batch
        x_recon, mu, logvar = self.forward(x, attrs)
        recon_loss = F.binary_cross_entropy(x_recon, x, reduction='sum')
        kl_div = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
        loss = recon_loss + self.beta * kl_div
        self.log("train_loss", loss, prog_bar=True)
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, attrs = batch
        x_recon, mu, logvar = self.forward(x, attrs)
        recon_loss = F.binary_cross_entropy(x_recon, x, reduction='sum')
        kl_div = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
        loss = recon_loss + self.beta * kl_div
        self.log("val_loss", loss, prog_bar=True)
        return loss
    
    def log_validation_reconstructions(self):
        valid_loader = self.trainer.val_dataloaders
        if not isinstance(valid_loader, (list, tuple)):
            valid_loader = [valid_loader]
        valid_loader = valid_loader[0]
        batch = next(iter(valid_loader))
        x, attrs = batch
        x = x[:15].to(self.device)
        attrs = attrs[:15].to(self.device)
        self.eval()
        with torch.no_grad():
            x_recon, _, _ = self.forward(x, attrs)
        originals = x.cpu()
        recons = x_recon.cpu()
        fig = plt.figure(figsize=(15, 9))
        gs = gridspec.GridSpec(3, 10, wspace=0.1, hspace=0.2)
        for i in range(15):
            row = i // 5
            col = i % 5
            ax_orig = fig.add_subplot(gs[row, col])
            ax_orig.imshow(originals[i].permute(1, 2, 0))
            ax_orig.axis("off")
            if row == 0:
                ax_orig.set_title("Original")
            ax_recon = fig.add_subplot(gs[row, col + 5])
            ax_recon.imshow(recons[i].permute(1, 2, 0))
            ax_recon.axis("off")
            if row == 0:
                ax_recon.set_title("Reconstruction")
        plt.suptitle(f"Validation Reconstructions, Epoch {self.current_epoch + 1}", fontsize=16)
        plt.tight_layout()
        plt.show()
        self.train()
    
    def on_validation_epoch_end(self):
        # Check if we are at the correct epoch to produce our plots.
        if (self.current_epoch + 1) % self.sample_interval != 0:
            return

        # ----- Plot 1: 9 Unconditionally Generated Images -----
        self.eval()
        with torch.no_grad():
            z = torch.randn(9, self.latent_dim).to(self.device)
            # For unconditional generation, use a random attribute vector (each attribute randomly chosen as -1 or 1).
            random_attrs = torch.randint(0, 2, (9, self.attribute_dim), device=self.device) * 2 - 1
            z_cond = torch.cat([z, random_attrs], dim=1)
            unconditional_images = self.decode(z_cond)
        # Plot in a 3x3 grid.
        grid_rows = 3
        grid_cols = 3
        fig, axes = plt.subplots(grid_rows, grid_cols, figsize=(grid_cols * 3, grid_rows * 3))
        axes = axes.flatten()
        for i in range(9):
            img = unconditional_images[i].cpu().permute(1, 2, 0).numpy()
            axes[i].imshow(img)
            axes[i].axis("off")
        plt.suptitle(f"Epoch {self.current_epoch + 1}: 9 Unconditionally Generated Images", fontsize=16)
        plt.tight_layout()
        plt.show()

        # ----- Plot 2: 9 Images with Condition Wearing_Hat = 1 -----
        hat_images = []
        with torch.no_grad():
            for i in range(9):
                # Sample a random latent code.
                latent = torch.randn(self.latent_dim).to(self.device)
                # For specified condition, force "Wearing_Hat" to 1.
                # For any attribute not specified, generate a random value (-1 or 1).
                # Here we use our generate_image helper.
                gen_img = self.generate_image(latent, {"Wearing_Hat": 1}, attribute_names)
                hat_images.append(gen_img[0])
        # Plot these images in a 3x3 grid.
        fig, axes = plt.subplots(3, 3, figsize=(9, 9))
        axes = axes.flatten()
        for i, img in enumerate(hat_images):
            axes[i].imshow(img.cpu().permute(1, 2, 0).numpy())
            axes[i].axis("off")
        plt.suptitle(f"Epoch {self.current_epoch + 1}: 9 Generated Images with Wearing_Hat = 1", fontsize=16)
        plt.tight_layout()
        plt.show()

        # ----- Plot 3: Edit a Specific Image (Remove the Hat) -----
        # Assume df_subset is a DataFrame corresponding to the attribute CSV for your dataset,
        # and that it contains a column 'Wearing_Hat'.
        hat_df = df_subset[df_subset['Wearing_Hat'] == 1]
        if hat_df.empty:
            print("No images with Wearing_Hat attribute equal to 1 found.")
        else:
            # Retrieve the index of the first image in df_subset that has Wearing_Hat == 1.
            sample_index = hat_df.index[1]
            sample_img, sample_attr = dataset[sample_index]
            #print("\nAttributes for the image with Wearing_Hat = 1:")
            #for name, value in zip(attribute_names, sample_attr.tolist()):
            #    print(f"{name}: {value}")
            # Edit the image: change Wearing_Hat from 1 to -1.
            edited_img = self.edit_image(sample_img, sample_attr, {"Wearing_Hat": -1}, attribute_names)
            # Plot the original and edited image side by side.
            fig, axes = plt.subplots(1, 2, figsize=(12, 6))
            axes[0].imshow(sample_img.cpu().permute(1, 2, 0).numpy())
            axes[0].set_title("Original Image (Wearing_Hat = 1)")
            axes[0].axis("off")
            axes[1].imshow(edited_img[0].cpu().permute(1, 2, 0).numpy())
            axes[1].set_title("Edited Image (Wearing_Hat = -1)")
            axes[1].axis("off")
            plt.suptitle(f"Epoch {self.current_epoch + 1}: Image Editing", fontsize=16)
            plt.tight_layout()
            plt.show()
        self.train()
    
    def sample_new_images(self, n=25, plot=True):
        """
        Sample n images by drawing latent codes from a normal distribution,
        and generate a full attribute vector by randomly choosing -1 or 1 for each attribute.
        """
        self.eval()
        with torch.no_grad():
            z = torch.randn(n, self.latent_dim).to(self.device)
            # Random attribute vector for each image: for each attribute choose either -1 or 1.
            random_attrs = torch.randint(0, 2, (n, self.attribute_dim), device=self.device) * 2 - 1
            z_cond = torch.cat([z, random_attrs], dim=1)
            images = self.decode(z_cond)
        self.train()
        if plot:
            grid_rows = math.ceil(math.sqrt(n))
            grid_cols = math.ceil(n / grid_rows)
            fig, axes = plt.subplots(grid_rows, grid_cols, figsize=(grid_cols * 3, grid_rows * 3))
            if grid_rows * grid_cols > 1:
                axes = axes.flatten()
            else:
                axes = [axes]
            for i in range(n):
                img = images[i].cpu().permute(1, 2, 0)
                axes[i].imshow(img)
                axes[i].axis("off")
            for j in range(n, len(axes)):
                axes[j].axis("off")
            plt.suptitle("Sampled New Images from Conditional VAE")
            plt.tight_layout()
            plt.show()
        return images
    
    def generate_image(self, latent, attr_dict, attribute_names):
        """
        Generate an image given a latent code and a dictionary of attribute values.
        For any attribute not specified in attr_dict, randomly choose -1 or 1.
        
        Args:
            latent (Tensor): A latent code of shape [latent_dim] or [1, latent_dim].
            attr_dict (dict): Maps attribute names to desired values (-1 or 1).
            attribute_names (list of str): List of attribute names in the expected order.
            
        Returns:
            Tensor: Generated image of shape [1, image_channels, H, W].
        """
        if latent.dim() == 1:
            latent = latent.unsqueeze(0)
        # Generate a random attribute vector.
        random_attrs = torch.randint(0, 2, (1, self.attribute_dim), device=latent.device) * 2 - 1
        # Update the vector based on attr_dict.
        for name, value in attr_dict.items():
            if name in attribute_names:
                idx = attribute_names.index(name)
                random_attrs[0, idx] = float(value)
            else:
                print(f"Warning: Attribute '{name}' not found in attribute_names list.")
        z_cond = torch.cat([latent, random_attrs], dim=1)
        with torch.no_grad():
            gen_img = self.decode(z_cond)
        return gen_img
    
    def edit_image(self, image, current_attrs, attr_dict, attribute_names):
        """
        Edit an existing image by encoding it and then modifying its attribute vector.
        The user must supply the current attribute vector for the image and only the attributes to change.
        
        Args:
            image (Tensor): Input image tensor of shape (C, H, W).
            current_attrs (Tensor): Current attribute vector of shape (attribute_dim,) or (1, attribute_dim).
            attr_dict (dict): Dictionary mapping attribute names to desired values (-1 or 1).
            attribute_names (list of str): List of attribute names in order.
            
        Returns:
            Tensor: Edited image of shape [1, image_channels, H, W].
        """
        if image.dim() == 3:
            image = image.unsqueeze(0)
        image = image.to(next(self.parameters()).device)
        self.eval()
        with torch.no_grad():
            mu, _ = self.encode(image)
        self.train()
        if current_attrs.dim() == 1:
            current_attrs = current_attrs.unsqueeze(0)
        new_attrs = current_attrs.clone().to(mu.device)
        for name, value in attr_dict.items():
            if name in attribute_names:
                idx = attribute_names.index(name)
                new_attrs[0, idx] = float(value)
            else:
                print(f"Warning: Attribute '{name}' not found in attribute_names list.")
        z_cond = torch.cat([mu, new_attrs], dim=1)
        with torch.no_grad():
            edited_img = self.decode(z_cond)
        return edited_img
    
    def configure_optimizers(self):
        return optim.Adam(self.parameters(), lr=self.learning_rate)
In [82]:
# Set attribute_dim to match the number of attribute columns in your CSV (e.g., 40).
attribute_dim = 40

# Create an instance of the Conditional VAE.
model = ConditionalVAE(
    latent_dim=128,
    attribute_dim=attribute_dim,
    beta=4.0,
    learning_rate=1e-3,
    image_channels=3,
    sample_interval=5
)

# Initialize a Lightning Trainer.
trainer = Trainer(
    max_epochs=50, 
)

# Start training.
trainer.fit(model, train_dataloader, valid_dataloader)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name      | Type       | Params | Mode 
-------------------------------------------------
0 | encoder   | Sequential | 690 K  | train
1 | fc_mu     | Linear     | 6.4 M  | train
2 | fc_logvar | Linear     | 6.4 M  | train
3 | fc_dec    | Linear     | 8.5 M  | train
4 | decoder   | Sequential | 689 K  | train
-------------------------------------------------
22.7 M    Trainable params
0         Non-trainable params
22.7 M    Total params
90.820    Total estimated model params size (MB)
Sanity Checking: |          | 0/? [00:00<?, ?it/s]
Training: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
Validation: |          | 0/? [00:00<?, ?it/s]
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
/home/kmcalist/.local/lib/python3.10/site-packages/pytorch_lightning/trainer/call.py:54: Detected KeyboardInterrupt, attempting graceful shutdown...

Density Ratios¶

In [74]:
import numpy as np
import seaborn as sns
#Generate Random Samples from a Mixture of two normal distributions
# Define the parameters for the two normal distributions
mean1 = 0
variance1 = 1

mean2 = 5
variance2 = 2

# Generate random draws from the mixture distribution
n = 1000
weights = [0.95, 0.05]  # Equal weights for the two distributions

np.random.seed(42)

# Generate random indices to select which distribution to sample from
indices = np.random.choice([0, 1], size=n, p=weights)

# Generate random samples from the mixture distribution
p = np.zeros(n)
for i in range(n):
    if indices[i] == 0:
        p[i] = np.random.normal(mean1, np.sqrt(variance1))
    else:
        p[i] = np.random.normal(mean2, np.sqrt(variance2))

import matplotlib.pyplot as plt

fig, axs = plt.subplots(1, 2, figsize=(12, 4))

# Plot histogram of samples
axs[0].hist(p, bins=10, color='skyblue', edgecolor='black')
axs[0].set_title('Histogram of Samples')
axs[0].set_xlabel('Value')
axs[0].set_ylabel('Frequency')

# Plot rug plot of samples
sns.rugplot(p, ax=axs[1], height=0.2, color='skyblue')
axs[1].set_title('Rug Plot of Samples')
axs[1].set_xlabel('Value')
axs[1].set_ylabel('')

plt.tight_layout()
plt.show()
No description has been provided for this image
In [75]:
import numpy as np

def generate_mixture_samples(means, variances, weights, n):
    # Generate random indices to select which distribution to sample from
    indices = np.random.choice(len(means), size=n, p=weights)
    
    # Generate random samples from the mixture distribution
    samples = np.zeros(n)
    for i in range(n):
        samples[i] = np.random.normal(means[indices[i]], np.sqrt(variances[indices[i]]))
    
    return samples
In [76]:
import numpy as np
from scipy.stats import norm

def compute_density_ratio(candidate_means, candidate_variances, candidate_weights, true_means, true_variances, true_weights):
    # Define the grid of values
    grid = np.linspace(-5, 5, 1000)
    
    # Compute the density ratio for each value in the grid
    density_ratio = np.zeros_like(grid)
    for i, value in enumerate(grid):
        candidate_pdf = np.sum([weight * norm.pdf(value, mean, np.sqrt(variance)) for mean, variance, weight in zip(candidate_means, candidate_variances, candidate_weights)])
        true_pdf = np.sum([weight * norm.pdf(value, mean, np.sqrt(variance)) for mean, variance, weight in zip(true_means, true_variances, true_weights)])
        density_ratio[i] = candidate_pdf / true_pdf
    
    return density_ratio
In [77]:
import seaborn as sns
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import log_loss
q = generate_mixture_samples([1],[1],[1],1000)
dr = compute_density_ratio([1],[1],[1],[0,5],[1,2],[.95,.05])
# Concatenate p and q into a single vector x
x = np.concatenate((p, q))

# Generate vector y
y = np.concatenate((np.ones_like(p), np.zeros_like(q)))

# Create an instance of LogisticRegression with no penalty
logreg = LogisticRegression(penalty='none')

# Fit the logistic regression model
logreg.fit(x.reshape(-1, 1), y)

# Predict the probabilities for the samples
y_pred = logreg.predict_proba(x.reshape(-1, 1))

# Compute the binary cross-entropy loss
loss = -log_loss(y, y_pred)

import matplotlib.pyplot as plt
fig, axs = plt.subplots(1, 3, figsize=(18, 10))

# Plot histogram of p and q samples colored by source
axs[0].hist(p, bins=10, color='skyblue', edgecolor='black', alpha=0.5, label='p')
axs[0].hist(q, bins=10, color='orange', edgecolor='black', alpha=0.5, label='q')
axs[0].set_title('Histogram of Samples')
axs[0].set_xlabel('Value')
axs[0].set_ylabel('Frequency')
axs[0].legend()

# Plot rug plot of p and q samples colored by source
sns.rugplot(p, ax=axs[1], height=0.2, color='skyblue', alpha=0.5, label='Real')
sns.rugplot(q, ax=axs[1], height=0.2, color='orange', alpha=0.5, label='Fake')
axs[1].set_title('Rug Plot of Samples')
axs[1].set_xlabel('Value')
axs[1].set_ylabel('')
axs[1].legend()

# Plot logistic regression curve
x_range = np.linspace(min(x), max(x), 100)
y_range = logreg.predict_proba(x_range.reshape(-1, 1))[:, 1]
axs[1].plot(x_range, y_range, color='red', label='Logistic Regression')
axs[1].legend()
# Print loss in the top left corner
axs[1].text(0.05, 0.95, f'Loss: {loss:.4f}', transform=axs[1].transAxes, verticalalignment='top', bbox=dict(facecolor='white', edgecolor='black', boxstyle='round'))

# Plot dr against np.linspace(-5, 5, 1000)
axs[2].plot(np.linspace(-5, 5, 1000), dr)
axs[2].set_title('Density Ratio')
axs[2].set_xlabel('Value')
axs[2].set_ylabel('Density Ratio')




plt.tight_layout()
plt.show()
/home/kmcalist/.local/lib/python3.10/site-packages/sklearn/linear_model/_logistic.py:1183: FutureWarning: `penalty='none'`has been deprecated in 1.2 and will be removed in 1.4. To keep the past behaviour, set `penalty=None`.
  warnings.warn(
No description has been provided for this image
In [78]:
import seaborn as sns
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import log_loss
q = generate_mixture_samples([0],[1],[1],1000)
dr = compute_density_ratio([0],[1],[1],[0,5],[1,2],[.95,.05])
# Concatenate p and q into a single vector x
x = np.concatenate((p, q))

# Generate vector y
y = np.concatenate((np.ones_like(p), np.zeros_like(q)))

# Create an instance of LogisticRegression with no penalty
logreg = LogisticRegression(penalty='none')

# Fit the logistic regression model
logreg.fit(x.reshape(-1, 1), y)

# Predict the probabilities for the samples
y_pred = logreg.predict_proba(x.reshape(-1, 1))

# Compute the binary cross-entropy loss
loss = -log_loss(y, y_pred)

import matplotlib.pyplot as plt
fig, axs = plt.subplots(1, 3, figsize=(18, 10))

# Plot histogram of p and q samples colored by source
axs[0].hist(p, bins=10, color='skyblue', edgecolor='black', alpha=0.5, label='p')
axs[0].hist(q, bins=10, color='orange', edgecolor='black', alpha=0.5, label='q')
axs[0].set_title('Histogram of Samples')
axs[0].set_xlabel('Value')
axs[0].set_ylabel('Frequency')
axs[0].legend()

# Plot rug plot of p and q samples colored by source
sns.rugplot(p, ax=axs[1], height=0.2, color='skyblue', alpha=0.5, label='Real')
sns.rugplot(q, ax=axs[1], height=0.2, color='orange', alpha=0.5, label='Fake')
axs[1].set_title('Rug Plot of Samples')
axs[1].set_xlabel('Value')
axs[1].set_ylabel('')
axs[1].legend()

# Plot logistic regression curve
x_range = np.linspace(min(x), max(x), 100)
y_range = logreg.predict_proba(x_range.reshape(-1, 1))[:, 1]
axs[1].plot(x_range, y_range, color='red', label='Logistic Regression')
axs[1].legend()
# Print loss in the top left corner
axs[1].text(0.05, 0.95, f'Loss: {loss:.4f}', transform=axs[1].transAxes, verticalalignment='top', bbox=dict(facecolor='white', edgecolor='black', boxstyle='round'))

# Plot dr against np.linspace(-5, 5, 1000)
axs[2].plot(np.linspace(-5, 5, 1000), dr)
axs[2].set_title('Density Ratio')
axs[2].set_xlabel('Value')
axs[2].set_ylabel('Density Ratio')




plt.tight_layout()
plt.show()
/home/kmcalist/.local/lib/python3.10/site-packages/sklearn/linear_model/_logistic.py:1183: FutureWarning: `penalty='none'`has been deprecated in 1.2 and will be removed in 1.4. To keep the past behaviour, set `penalty=None`.
  warnings.warn(
No description has been provided for this image
In [80]:
import seaborn as sns
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import log_loss
q = generate_mixture_samples([0],[1.5],[1],1000)
dr = compute_density_ratio([0],[1],[1.5],[0,5],[1,2],[.95,.05])
# Concatenate p and q into a single vector x
x = np.concatenate((p, q))

# Generate vector y
y = np.concatenate((np.ones_like(p), np.zeros_like(q)))

# Create an instance of LogisticRegression with no penalty
logreg = LogisticRegression(penalty='none')

# Fit the logistic regression model
logreg.fit(x.reshape(-1, 1), y)

# Predict the probabilities for the samples
y_pred = logreg.predict_proba(x.reshape(-1, 1))

# Compute the binary cross-entropy loss
loss = -log_loss(y, y_pred)

import matplotlib.pyplot as plt
fig, axs = plt.subplots(1, 3, figsize=(18, 10))

# Plot histogram of p and q samples colored by source
axs[0].hist(p, bins=10, color='skyblue', edgecolor='black', alpha=0.5, label='p')
axs[0].hist(q, bins=10, color='orange', edgecolor='black', alpha=0.5, label='q')
axs[0].set_title('Histogram of Samples')
axs[0].set_xlabel('Value')
axs[0].set_ylabel('Frequency')
axs[0].legend()

# Plot rug plot of p and q samples colored by source
sns.rugplot(p, ax=axs[1], height=0.2, color='skyblue', alpha=0.5, label='Real')
sns.rugplot(q, ax=axs[1], height=0.2, color='orange', alpha=0.5, label='Fake')
axs[1].set_title('Rug Plot of Samples')
axs[1].set_xlabel('Value')
axs[1].set_ylabel('')
axs[1].legend()

# Plot logistic regression curve
x_range = np.linspace(min(x), max(x), 100)
y_range = logreg.predict_proba(x_range.reshape(-1, 1))[:, 1]
axs[1].plot(x_range, y_range, color='red', label='Logistic Regression')
axs[1].legend()
# Print loss in the top left corner
axs[1].text(0.05, 0.95, f'Loss: {loss:.4f}', transform=axs[1].transAxes, verticalalignment='top', bbox=dict(facecolor='white', edgecolor='black', boxstyle='round'))

# Plot dr against np.linspace(-5, 5, 1000)
axs[2].plot(np.linspace(-5, 5, 1000), dr)
axs[2].set_title('Density Ratio')
axs[2].set_xlabel('Value')
axs[2].set_ylabel('Density Ratio')




plt.tight_layout()
plt.show()
/home/kmcalist/.local/lib/python3.10/site-packages/sklearn/linear_model/_logistic.py:1183: FutureWarning: `penalty='none'`has been deprecated in 1.2 and will be removed in 1.4. To keep the past behaviour, set `penalty=None`.
  warnings.warn(
No description has been provided for this image
In [81]:
import seaborn as sns
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import log_loss
q = generate_mixture_samples([0,5],[1,2],[.95,.05],1000)
dr = compute_density_ratio([0,5],[1,2],[.95,.05],[0,5],[1,2],[.95,.05])
# Concatenate p and q into a single vector x
x = np.concatenate((p, q))

# Generate vector y
y = np.concatenate((np.ones_like(p), np.zeros_like(q)))

# Create an instance of LogisticRegression with no penalty
logreg = LogisticRegression(penalty='none')

# Fit the logistic regression model
logreg.fit(x.reshape(-1, 1), y)

# Predict the probabilities for the samples
y_pred = logreg.predict_proba(x.reshape(-1, 1))

# Compute the binary cross-entropy loss
loss = -log_loss(y, y_pred)

import matplotlib.pyplot as plt
fig, axs = plt.subplots(1, 3, figsize=(18, 10))

# Plot histogram of p and q samples colored by source
axs[0].hist(p, bins=10, color='skyblue', edgecolor='black', alpha=0.5, label='p')
axs[0].hist(q, bins=10, color='orange', edgecolor='black', alpha=0.5, label='q')
axs[0].set_title('Histogram of Samples')
axs[0].set_xlabel('Value')
axs[0].set_ylabel('Frequency')
axs[0].legend()

# Plot rug plot of p and q samples colored by source
sns.rugplot(p, ax=axs[1], height=0.2, color='skyblue', alpha=0.5, label='Real')
sns.rugplot(q, ax=axs[1], height=0.2, color='orange', alpha=0.5, label='Fake')
axs[1].set_title('Rug Plot of Samples')
axs[1].set_xlabel('Value')
axs[1].set_ylabel('')
axs[1].legend()

# Plot logistic regression curve
x_range = np.linspace(min(x), max(x), 100)
y_range = logreg.predict_proba(x_range.reshape(-1, 1))[:, 1]
axs[1].plot(x_range, y_range, color='red', label='Logistic Regression')
axs[1].legend()
# Print loss in the top left corner
axs[1].text(0.05, 0.95, f'Loss: {loss:.4f}', transform=axs[1].transAxes, verticalalignment='top', bbox=dict(facecolor='white', edgecolor='black', boxstyle='round'))

# Plot dr against np.linspace(-5, 5, 1000)
axs[2].plot(np.linspace(-5, 5, 1000), dr)
axs[2].set_title('Density Ratio')
axs[2].set_xlabel('Value')
axs[2].set_ylabel('Density Ratio')




plt.tight_layout()
plt.show()
/home/kmcalist/.local/lib/python3.10/site-packages/sklearn/linear_model/_logistic.py:1183: FutureWarning: `penalty='none'`has been deprecated in 1.2 and will be removed in 1.4. To keep the past behaviour, set `penalty=None`.
  warnings.warn(
No description has been provided for this image